package flinkstudy.table.mock;

import flinkstudy.Constant;
import flinkstudy.bo.HouseInfo;
import flinkstudy.bo.HouseInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;

import java.io.File;

/**
 * 房源  csv 数据
 *
 * @author daocr
 * @date 2019/12/14
 */
public class HouseInfoCsvDataSource {

    /**
     * 小区表列
     */
    public static String HOUSE_INFO_COLUMN = "houseId,cityId,houseName,districtId,plateId";

    /**
     * 小区表
     */
    public static final String T_HOUSE_INFO = "t_house_info";

    public static DataSource<HouseInfo> of(ExecutionEnvironment env) {

        // 抽取 UserBehavior 的 TypeInformation，是一个 PojoTypeInfo
        PojoTypeInfo<HouseInfo> pojoType = (PojoTypeInfo<HouseInfo>) TypeExtractor.createTypeInfo(HouseInfo.class);
        // 由于 Java 反射抽取出的字段顺序是不确定的，需要显式指定下文件中字段的顺序
        String[] fieldOrder = new String[]{"houseId", "cityId", "houseName", "districtId", "plateId"};

        // 创建 PojoCsvInputFormat
        PojoCsvInputFormat<HouseInfo> csvInput = new PojoCsvInputFormat<>(Path.fromLocalFile(new File(Constant.FilePath.HOUSE_CSV_PATH)), pojoType, fieldOrder);

        DataSource<HouseInfo> input = env.createInput(csvInput, pojoType);

        return input;
    }

}
